/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package cn.ac.iie.dpl.data.mmloader.mq;

import cn.ac.iie.dpl.data.mmloader.DplDataMmloader;
import cn.ac.iie.dpl.data.mmloader.globalParas.GlobalParas;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;

public class RocketConsumer {

    private Logger logger = Logger.getLogger(RocketConsumer.class.getName());
    private String schemaName = null;
    private LinkedBlockingQueue<GenericRecord> result = null;
    private DefaultMQPushConsumer consumer;

    public RocketConsumer(String schemaName, LinkedBlockingQueue result) {
        this.schemaName = schemaName;
        this.result = result;
    }

    public void stratUp() throws MQClientException {
        consumer = new DefaultMQPushConsumer("dpl_mmloader_" + GlobalParas.region + "_" + schemaName.split("[.]")[1]);
        consumer.setNamesrvAddr(GlobalParas.nameServer);
        consumer.setClientIP(GlobalParas.machineIP);
        consumer.setInstanceName(GlobalParas.machineIP + "_" + "dpl_mmloader_" + GlobalParas.region + "_" + schemaName);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeThreadMin(4);
        consumer.setConsumeThreadMax(8);
        consumer.setClientCallbackExecutorThreads(10);
        consumer.setPullInterval(50L);
        consumer.setPullThresholdForQueue(10);
        consumer.setPullBatchSize(64);
        consumer.setConsumeConcurrentlyMaxSpan(65535);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    GenericRecord record = null;
                    try {
                        byte[] binaryData = msg.getBody();
                        ByteArrayInputStream docsbis = new ByteArrayInputStream(binaryData);
                        BinaryDecoder docsbd = new DecoderFactory().binaryDecoder(docsbis, null);
                        GenericRecord docsRecord = new GenericData.Record(GlobalParas.dpldocsSchema);
                        GenericDatumReader<GenericRecord> docsReader = new GenericDatumReader<GenericRecord>(GlobalParas.dpldocsSchema);
                        docsReader.read(docsRecord, docsbd);
                        String doc_schema_name = docsRecord.get("doc_schema_name").toString();
                        if (!GlobalParas.srcToDocDesc.contains(schemaName)) {
                            GlobalParas.srcToDocDesc.put(schemaName, (Map<Utf8, Utf8>) docsRecord.get("doc_desc"));
                        }
                        if (!GlobalParas.srcToDesDocDesc.contains(schemaName)) {
                            Map<String, String> desDocDesc = new HashMap<String, String>();
                            for (Utf8 colName : GlobalParas.srcToDocDesc.get(schemaName).keySet()) {
                                desDocDesc.put(colName.toString(), GlobalParas.srcToDocDesc.get(schemaName).get(colName).toString());
                            }
                            GlobalParas.srcToDesDocDesc.put(schemaName, desDocDesc);
                        }
                        if (GlobalParas.srcSchemaName.contains(doc_schema_name)) {
                            GenericArray docsset = (GenericData.Array) docsRecord.get("doc_set");
                            logger.debug("This table " + doc_schema_name + " msgid is " + msg.getMsgId());
                            logger.debug("This table " + doc_schema_name + " msgkey is " + msg.getKeys());
                            Iterator itor = docsset.iterator();
                            logger.info("get " + docsset.size() + " " + schemaName + " messages");
                            DatumReader<GenericRecord> singleDocsReader = new GenericDatumReader(GlobalParas.dpldocSchema);
                            while (itor.hasNext()) {
                                ByteArrayInputStream databis = new ByteArrayInputStream(((ByteBuffer) itor.next()).array());
                                BinaryDecoder dataDecoder = new DecoderFactory().binaryDecoder(databis, null);
                                try {
                                    record = (GenericRecord) singleDocsReader.read(null, dataDecoder);
                                    result.offer(record);
                                } catch (Exception ex) {
                                    logger.error(ex.getMessage(), ex);
                                }
                            }
                        } else {
                            logger.warn("why " + doc_schema_name + " in this topic.");
                        }
                    } catch (Exception ex) {
                        logger.error("split the one data " + schemaName + " in the dataPool wrong " + ex + " re: " + record.toString(), ex);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        logger.info("init consumer for schema " + schemaName + " successfully");
        consumer.subscribe(GlobalParas.srcTopicMap.get(schemaName), "*");
        DplDataMmloader.srcConsumerMap.put(schemaName, this);
        consumer.start();
        logger.info("consumer " + schemaName + " has started successfully");
    }

    public void stop() {
        consumer.shutdown();
        logger.info("consumer " + schemaName + " has stopped successfully");
    }
}
